feat(ai): add TaskTracker for task lifecycle state observability#746
feat(ai): add TaskTracker for task lifecycle state observability#746Tejeshyewale wants to merge 46 commits intorocketride-org:developfrom
Conversation
Fixed wording, numbering, and documentation clarity in README.
- Add task_tracker.py with TaskState enum, TaskRecord dataclass, TaskTracker class, and module-level tracker singleton - Integrate tracker into task_engine.py at 6 lifecycle points: * on_starting: STARTING state set * on_initializing: subprocess about to spawn * on_running: subprocess confirmed alive * on_failed: startup exception caught * on_completed: clean termination * on_failed: abnormal termination - Thread-safe via threading.Lock - Zero new dependencies - stdlib only
📝 WalkthroughWalkthroughAdds an ML sklearn prediction node (IGlobal, IInstance, PreProcessor, service metadata, requirements, docs) that manages a shared preprocessor and routes inference results; and introduces a task lifecycle tracking subsystem (TaskState, TaskRecord, TaskTracker, structured task logger) with integrations in the task engine. ChangesML Sklearn Prediction Node
Task Lifecycle Tracking
Sequence DiagramsequenceDiagram
actor Input as Question
participant IInstance
participant IGlobal
participant PreProcessor
participant Downstream
Input->>IInstance: writeAnswers(question)
IInstance->>IGlobal: ensure preprocessor exists
IGlobal-->>IInstance: preprocessor
IInstance->>IInstance: deep-copy question & extract text
IInstance->>PreProcessor: process(text)
PreProcessor-->>IInstance: processed_text
IInstance->>Downstream: forward updated question
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.Comment |
|
No description provided. |
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@nodes/src/nodes/ml_sklearn/code.py`:
- Line 16: The __init__ method currently triggers lint errors: rename the unused
parameter config to _config (or prefix it with an underscore) to satisfy ARG002,
and add an explicit return type annotation "-> None" to the __init__ signature
to satisfy ANN204; update the signature in the constructor (def __init__(self,
_config: dict) -> None:) and keep usage unchanged if the value is intentionally
unused.
In `@nodes/src/nodes/ml_sklearn/README.md`:
- Line 7: The README's input description "text (number as string)" conflicts
with the services.json test fixture which uses "hello world"; update either the
README entry in nodes/src/nodes/ml_sklearn/README.md to describe the input as
"text (string)" for arbitrary strings, or change the services.json "test"
fixture to a numeric string like "250" so the example matches the README; ensure
the README and the "test" value in services.json are consistent.
In `@nodes/src/nodes/ml_sklearn/services.json`:
- Around line 30-43: The shape field in nodes/src/nodes/ml_sklearn/services.json
is using an inputs/outputs map that doesn't match the project's UI schema;
replace the current "shape": { "inputs": [...], "outputs": [...] } with an empty
array "shape": [] because ml_sklearn has no user-configurable fields; update the
shape value in that JSON object (referencing the existing "shape" key in the
ml_sklearn service definition) so the UI form renderer recognizes it as having
no sections.
- Around line 17-28: The preconfig block is malformed causing
Config.getNodeConfig (called from IGlobal.beginGlobal) to raise KeyError on
preconfig['profiles']; fix by making preconfig.default a string ("default"),
move the existing "ml_sklearn.default" profile object into preconfig.profiles as
a dictionary entry, and remove the top-level "profiles" sibling; ensure the
profile key matches the default name and keep the profile object structure
(e.g., object/properties) under preconfig.profiles.
- Around line 9-16: The "pipe" wrapper uses a non-standard nested lanes object
("answers": {"in": true, "out": true}) which prevents the pipeline from
registering the lane; replace that structure with the standard top-level lanes +
input arrays used elsewhere (e.g., agent_langchain/response): remove the nested
object under "pipe" and instead declare "lanes" as an array containing "answers"
and add an "input" array referencing "answers" so the pipeline framework
recognizes the answers lane.
In `@packages/ai/src/ai/modules/task/task_engine.py`:
- Line 1620: The tracker lifecycle calls in start_task() are inverted:
TaskTracker.on_running is invoked immediately after create_subprocess_exec but
on_initializing is called later; swap them so the order matches the documented
TaskState lifecycle (on_starting → on_initializing → on_running → on_completed).
Specifically, call tracker.on_initializing(self.id) right after the subprocess
is created (before stdio/metrics setup) to mark "subprocess about to be
spawned", and call tracker.on_running(self.id) only after stdio/metrics are set
up and the subprocess is confirmed alive; update both occurrences (the current
calls at the create_subprocess_exec location and the later stdio/metrics
completion location) to preserve the correct sequence.
- Around line 860-863: In _terminated(), change the exitCode check so
unknown/None is treated as failure: replace the current condition that relies on
falsiness of self._status.exitCode with an explicit check (e.g., if
self._stop_requested or self._status.exitCode is None or self._status.exitCode
!= 0) so that tracker.on_failed(self.id) is invoked when exitCode is None or
non-zero; leave tracker.on_completed(self.id) only for confirmed successful
exitCode == 0—this prevents a transient on_completed before the start_task
exception handler later calls tracker.on_failed.
- Around line 1699-1703: Replace the call to _logger.error(..., exc_info=True)
with _logger.exception(...) in the task startup error handling so the current
exception traceback is logged idiomatically; keep the same message string and
the extra={'task_id': self.id, 'step': 'error', 'error': str(e)} payload,
locating the call to _logger.error in task_engine.py (the task startup/error
handling block where self.id and e are referenced) and swap the logging method
to _logger.exception while preserving arguments.
In `@packages/ai/src/ai/modules/task/task_tracker.py`:
- Line 7: Replace deprecated typing aliases: remove "from typing import Dict,
Optional", change any "Dict[...]" usages to the built-in "dict[...]" generic,
and replace "Optional[T]" annotations with the union form "T | None" (e.g.,
update the TaskRecord.end_time annotation to "datetime | None"). Update
occurrences noted (around TaskRecord and other uses at the indicated locations)
and remove the now-unused Optional import.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 23bca35d-a375-4aa8-bd3d-7936b41d14a6
📒 Files selected for processing (9)
nodes/src/nodes/ml_sklearn/IGlobal.pynodes/src/nodes/ml_sklearn/IInstance.pynodes/src/nodes/ml_sklearn/README.mdnodes/src/nodes/ml_sklearn/__init__.pynodes/src/nodes/ml_sklearn/code.pynodes/src/nodes/ml_sklearn/requirements.txtnodes/src/nodes/ml_sklearn/services.jsonpackages/ai/src/ai/modules/task/task_engine.pypackages/ai/src/ai/modules/task/task_tracker.py
| class PreProcessor: | ||
| """Wraps a scikit-learn model/pipeline for text inference.""" | ||
|
|
||
| def __init__(self, config: dict): |
There was a problem hiding this comment.
Two Ruff linting failures on __init__ will block CI.
- ARG002 —
configis received but never read. Prefix with_to signal intentional non-use. - ANN204 —
__init__is missing a-> Nonereturn-type annotation.
🔧 Proposed fix
- def __init__(self, config: dict):
+ def __init__(self, _config: dict) -> None:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def __init__(self, config: dict): | |
| def __init__(self, _config: dict) -> None: |
🧰 Tools
🪛 Ruff (0.15.12)
[warning] 16-16: Missing return type annotation for special method __init__
Add return type annotation: None
(ANN204)
[warning] 16-16: Unused method argument: config
(ARG002)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nodes/src/nodes/ml_sklearn/code.py` at line 16, The __init__ method currently
triggers lint errors: rename the unused parameter config to _config (or prefix
it with an underscore) to satisfy ARG002, and add an explicit return type
annotation "-> None" to the __init__ signature to satisfy ANN204; update the
signature in the constructor (def __init__(self, _config: dict) -> None:) and
keep usage unchanged if the value is intentionally unused.
|
|
||
| ## Input | ||
|
|
||
| - text (number as string) |
There was a problem hiding this comment.
Input description is inconsistent with the test fixture in services.json.
The README states the input is a "number as string" (e.g., 250), but services.json uses "hello world" as the test payload. Either update the test fixture to a numeric string (e.g., "250") or broaden the description to "text (string)" if arbitrary strings are valid.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nodes/src/nodes/ml_sklearn/README.md` at line 7, The README's input
description "text (number as string)" conflicts with the services.json test
fixture which uses "hello world"; update either the README entry in
nodes/src/nodes/ml_sklearn/README.md to describe the input as "text (string)"
for arbitrary strings, or change the services.json "test" fixture to a numeric
string like "250" so the example matches the README; ensure the README and the
"test" value in services.json are consistent.
| "pipe": { | ||
| "lanes": { | ||
| "answers": { | ||
| "in": true, | ||
| "out": true | ||
| } | ||
| } | ||
| }, |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify: do any other "preprocessor" group nodes use the pipe.lanes structure?
# Expected: none — all should use top-level "lanes" + "input"
rg -l '"group":\s*"preprocessor"' --glob '*/services.json' | xargs -I{} sh -c 'echo "=== {} ==="; cat "{}"' | grep -A10 '"pipe"\|"lanes"'Repository: rocketride-org/rocketride-server
Length of output: 225
🏁 Script executed:
# Find all services.json files in the repository
find . -name "services.json" -type f | head -20Repository: rocketride-org/rocketride-server
Length of output: 978
🏁 Script executed:
# Read the ml_sklearn/services.json file to see the full context
cat -n ./nodes/src/nodes/ml_sklearn/services.jsonRepository: rocketride-org/rocketride-server
Length of output: 1695
🏁 Script executed:
# Find all preprocessor group nodes and extract their pipe/lanes structures
rg '"group":\s*"preprocessor"' -l ./nodes/src/nodes/*/services.json | while read file; do
echo "=== $file ==="
# Extract the pipe or lanes section (simplified for readability)
cat "$file" | jq -r 'to_entries[] | select(.value.group == "preprocessor") | .value | {pipe, lanes, input}' 2>/dev/null || echo "Error parsing"
doneRepository: rocketride-org/rocketride-server
Length of output: 269
🏁 Script executed:
# Broader search: find all preprocessor group nodes
find ./nodes/src/nodes/*/services.json -type f | xargs grep -l '"group":\s*"preprocessor"'Repository: rocketride-org/rocketride-server
Length of output: 119
🏁 Script executed:
# Find and examine agent_langchain and response node structures
echo "=== agent_langchain ===" && cat ./nodes/src/nodes/agent_langchain/services.json | jq '.' | head -50
echo ""
echo "=== response ===" && cat ./nodes/src/nodes/response/services.json | jq '.' 2>/dev/null | head -50Repository: rocketride-org/rocketride-server
Length of output: 1122
🏁 Script executed:
# Get the response node structure (if it exists)
find ./nodes/src/nodes -name "response" -type d | head -1Repository: rocketride-org/rocketride-server
Length of output: 103
🏁 Script executed:
# Read the response node services.json
cat -n ./nodes/src/nodes/response/services.jsonRepository: rocketride-org/rocketride-server
Length of output: 5344
🏁 Script executed:
# Check if any other node uses "pipe" wrapper
grep -r '"pipe"' ./nodes/src/nodes/*/services.json | head -20Repository: rocketride-org/rocketride-server
Length of output: 137
🏁 Script executed:
# Verify the {"in": true, "out": true} format doesn't appear elsewhere
grep -r '"in":\s*true' ./nodes/src/nodes/*/services.jsonRepository: rocketride-org/rocketride-server
Length of output: 151
Fix non-standard lane configuration structure in pipe wrapper.
The pipe wrapper with {"in": true, "out": true} format is unique to this file and doesn't match the codebase convention. Other nodes like agent_langchain and response use top-level "lanes" (with array values) paired with an "input" array. The current structure will likely prevent the pipeline framework from registering the answers lane correctly.
Use the standard structure:
🔧 Proposed fix
- "pipe": {
- "lanes": {
- "answers": {
- "in": true,
- "out": true
- }
- }
- },
+ "lanes": {
+ "answers": []
+ },
+ "input": [
+ {
+ "lane": "answers",
+ "output": []
+ }
+ ],📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "pipe": { | |
| "lanes": { | |
| "answers": { | |
| "in": true, | |
| "out": true | |
| } | |
| } | |
| }, | |
| "lanes": { | |
| "answers": [] | |
| }, | |
| "input": [ | |
| { | |
| "lane": "answers", | |
| "output": [] | |
| } | |
| ], |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nodes/src/nodes/ml_sklearn/services.json` around lines 9 - 16, The "pipe"
wrapper uses a non-standard nested lanes object ("answers": {"in": true, "out":
true}) which prevents the pipeline from registering the lane; replace that
structure with the standard top-level lanes + input arrays used elsewhere (e.g.,
agent_langchain/response): remove the nested object under "pipe" and instead
declare "lanes" as an array containing "answers" and add an "input" array
referencing "answers" so the pipeline framework recognizes the answers lane.
| "preconfig": { | ||
| "default": { | ||
| "object": "default", | ||
| "properties": [] | ||
| } | ||
| }, | ||
| "profiles": { | ||
| "ml_sklearn.default": { | ||
| "object": "default", | ||
| "properties": [] | ||
| } | ||
| }, |
There was a problem hiding this comment.
preconfig is structurally malformed — Config.getNodeConfig will throw KeyError: 'profiles' at runtime.
IGlobal.beginGlobal() calls Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig). That method (in packages/ai/src/ai/common/config.py) expects:
preconfig.defaultto be a string (the default profile name, e.g."default")preconfig.profilesto be a dict of profile objects
Currently, preconfig.default is an object {"object": "default", "properties": []}, and the "profiles" dict is a top-level sibling of preconfig instead of nested inside it. When getNodeConfig executes preconfig['profiles'], it will raise KeyError: 'profiles', crashing any non-CONFIG-mode startup.
🐛 Proposed fix — restructure `preconfig` and remove misplaced `profiles`
- "preconfig": {
- "default": {
- "object": "default",
- "properties": []
- }
- },
- "profiles": {
- "ml_sklearn.default": {
- "object": "default",
- "properties": []
- }
- },
+ "preconfig": {
+ "default": "default",
+ "profiles": {
+ "default": {}
+ }
+ },📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "preconfig": { | |
| "default": { | |
| "object": "default", | |
| "properties": [] | |
| } | |
| }, | |
| "profiles": { | |
| "ml_sklearn.default": { | |
| "object": "default", | |
| "properties": [] | |
| } | |
| }, | |
| "preconfig": { | |
| "default": "default", | |
| "profiles": { | |
| "default": {} | |
| } | |
| }, |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nodes/src/nodes/ml_sklearn/services.json` around lines 17 - 28, The preconfig
block is malformed causing Config.getNodeConfig (called from
IGlobal.beginGlobal) to raise KeyError on preconfig['profiles']; fix by making
preconfig.default a string ("default"), move the existing "ml_sklearn.default"
profile object into preconfig.profiles as a dictionary entry, and remove the
top-level "profiles" sibling; ensure the profile key matches the default name
and keep the profile object structure (e.g., object/properties) under
preconfig.profiles.
| "shape": { | ||
| "inputs": [ | ||
| { | ||
| "name": "answers", | ||
| "type": "answers" | ||
| } | ||
| ], | ||
| "outputs": [ | ||
| { | ||
| "name": "answers", | ||
| "type": "answers" | ||
| } | ||
| ] | ||
| }, |
There was a problem hiding this comment.
shape uses the wrong format — it should be an array of section objects, not an inputs/outputs map.
In this codebase, shape is a UI form-rendering directive: an array of {section, title, properties} objects (see agent_langchain/services.json and extract_data/services.json). The current {"inputs": [...], "outputs": [...]} structure appears borrowed from a different schema convention and will not be processed correctly by the UI layer.
Since ml_sklearn has no user-configurable fields, shape should be an empty array:
🔧 Proposed fix
- "shape": {
- "inputs": [
- {
- "name": "answers",
- "type": "answers"
- }
- ],
- "outputs": [
- {
- "name": "answers",
- "type": "answers"
- }
- ]
- },
+ "shape": [],📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "shape": { | |
| "inputs": [ | |
| { | |
| "name": "answers", | |
| "type": "answers" | |
| } | |
| ], | |
| "outputs": [ | |
| { | |
| "name": "answers", | |
| "type": "answers" | |
| } | |
| ] | |
| }, | |
| "shape": [], |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nodes/src/nodes/ml_sklearn/services.json` around lines 30 - 43, The shape
field in nodes/src/nodes/ml_sklearn/services.json is using an inputs/outputs map
that doesn't match the project's UI schema; replace the current "shape": {
"inputs": [...], "outputs": [...] } with an empty array "shape": [] because
ml_sklearn has no user-configurable fields; update the shape value in that JSON
object (referencing the existing "shape" key in the ml_sklearn service
definition) so the UI form renderer recognizes it as having no sections.
| if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0): | ||
| tracker.on_failed(self.id) | ||
| else: | ||
| tracker.on_completed(self.id) |
There was a problem hiding this comment.
_terminated() may call on_completed for a startup failure, causing a transient incorrect state.
The condition on line 860:
if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0):treats exitCode = None as falsy, so tracker.on_completed() is called when exitCode is None — which occurs when the subprocess was spawned but hasn't exited yet at the time _terminated() is invoked from the start_task exception handler.
The explicit tracker.on_failed(self.id) at line 1698 then overrides this, but between the two calls concurrent readers see a spurious COMPLETED state for a task that actually failed at startup.
The fix is to default to on_failed whenever the exit code is not a confirmed success:
🐛 Proposed fix
- if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0):
- tracker.on_failed(self.id)
- else:
- tracker.on_completed(self.id)
+ if self._status.exitCode == 0 and not self._stop_requested:
+ tracker.on_completed(self.id)
+ else:
+ tracker.on_failed(self.id)With this change, on_failed is always emitted when exitCode is None (unknown/still-running), and the explicit tracker.on_failed(self.id) at line 1698 becomes a safe, harmless redundant call instead of a critical correction.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/ai/src/ai/modules/task/task_engine.py` around lines 860 - 863, In
_terminated(), change the exitCode check so unknown/None is treated as failure:
replace the current condition that relies on falsiness of self._status.exitCode
with an explicit check (e.g., if self._stop_requested or self._status.exitCode
is None or self._status.exitCode != 0) so that tracker.on_failed(self.id) is
invoked when exitCode is None or non-zero; leave tracker.on_completed(self.id)
only for confirmed successful exitCode == 0—this prevents a transient
on_completed before the start_task exception handler later calls
tracker.on_failed.
| 'pid': self._engine_process.pid, | ||
| }, | ||
| ) | ||
| tracker.on_running(self.id) |
There was a problem hiding this comment.
on_running and on_initializing are called in the wrong order, contradicting the documented lifecycle.
TaskTracker's docstring specifies the happy-path order as on_starting → on_initializing → on_running → on_completed, and the TaskState enum is declared in that same order. The semantic descriptions also confirm this — on_initializing is "subprocess about to be spawned" and on_running is "subprocess confirmed alive."
In start_task() the calls are inverted: on_running fires immediately after create_subprocess_exec (line 1620), and on_initializing fires after stdio/metrics setup (line 1651). Consumers calling tracker.get() or snapshot() will observe RUNNING before INITIALIZING, violating the documented contract.
🐛 Proposed fix — swap the two tracker calls
- tracker.on_running(self.id)
+ tracker.on_initializing(self.id)- tracker.on_initializing(self.id)
+ tracker.on_running(self.id)Also applies to: 1651-1651
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/ai/src/ai/modules/task/task_engine.py` at line 1620, The tracker
lifecycle calls in start_task() are inverted: TaskTracker.on_running is invoked
immediately after create_subprocess_exec but on_initializing is called later;
swap them so the order matches the documented TaskState lifecycle (on_starting →
on_initializing → on_running → on_completed). Specifically, call
tracker.on_initializing(self.id) right after the subprocess is created (before
stdio/metrics setup) to mark "subprocess about to be spawned", and call
tracker.on_running(self.id) only after stdio/metrics are set up and the
subprocess is confirmed alive; update both occurrences (the current calls at the
create_subprocess_exec location and the later stdio/metrics completion location)
to preserve the correct sequence.
| _logger.error( | ||
| 'Task startup failed', | ||
| extra={'task_id': self.id, 'step': 'error', 'error': str(e)}, | ||
| exc_info=True, | ||
| ) |
There was a problem hiding this comment.
Use _logger.exception() instead of _logger.error(..., exc_info=True).
Ruff G201 flags this. logging.exception() is the idiomatic stdlib shorthand for logging at ERROR level with the current exception traceback.
♻️ Proposed fix
- _logger.error(
- 'Task startup failed',
- extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
- exc_info=True,
- )
+ _logger.exception(
+ 'Task startup failed',
+ extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
+ )🧰 Tools
🪛 Ruff (0.15.12)
[warning] 1699-1699: Logging .exception(...) should be used instead of .error(..., exc_info=True)
(G201)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/ai/src/ai/modules/task/task_engine.py` around lines 1699 - 1703,
Replace the call to _logger.error(..., exc_info=True) with
_logger.exception(...) in the task startup error handling so the current
exception traceback is logged idiomatically; keep the same message string and
the extra={'task_id': self.id, 'step': 'error', 'error': str(e)} payload,
locating the call to _logger.error in task_engine.py (the task startup/error
handling block where self.id and e are referenced) and swap the logging method
to _logger.exception while preserving arguments.
| from dataclasses import dataclass, field | ||
| from enum import Enum | ||
| from time import monotonic | ||
| from typing import Dict, Optional |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Replace deprecated typing.Dict / typing.Optional with built-in generics.
Ruff UP035 flags line 7. On the Python 3.12 baseline used in this repo all three typing aliases are deprecated; prefer native syntax throughout.
♻️ Proposed refactor
-from typing import Dict, Optional
+from typing import Optional- self._records: Dict[str, TaskRecord] = {}
+ self._records: dict[str, TaskRecord] = {}- def get(self, task_id: str) -> Optional[TaskRecord]:
+ def get(self, task_id: str) -> TaskRecord | None:- def snapshot(self) -> Dict[str, dict]:
+ def snapshot(self) -> dict[str, dict]:And update the end_time annotation in TaskRecord:
- end_time: Optional[float]
+ end_time: float | NoneAfter these changes the Optional import can be removed entirely.
Also applies to: 68-68, 98-98, 103-103
🧰 Tools
🪛 Ruff (0.15.12)
[warning] 7-7: typing.Dict is deprecated, use dict instead
(UP035)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/ai/src/ai/modules/task/task_tracker.py` at line 7, Replace
deprecated typing aliases: remove "from typing import Dict, Optional", change
any "Dict[...]" usages to the built-in "dict[...]" generic, and replace
"Optional[T]" annotations with the union form "T | None" (e.g., update the
TaskRecord.end_time annotation to "datetime | None"). Update occurrences noted
(around TaskRecord and other uses at the indicated locations) and remove the
now-unused Optional import.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/ai/src/ai/modules/task/task_logger.py`:
- Around line 26-28: Replace the explicit loop that builds payload from
record.__dict__ with a dict comprehension to improve performance: instead of
iterating with "for key, value in record.__dict__.items()" and testing "if key
not in self._RESERVED", create a new dict via comprehension filtering out keys
in self._RESERVED and then assign or update payload accordingly; target the code
that references record.__dict__, self._RESERVED, and payload in task_logger.py
(the loop shown) and ensure semantics remain identical (only non-reserved keys
copied).
- Around line 18-20: The timestamp field in task_logger.py is built with
time.strftime(time_format, time.gmtime(record.created)) which discards
fractional seconds from record.created; update the code that builds 'timestamp'
(the time.strftime call for the 'timestamp' key) to preserve sub-second
precision by formatting record.created with a function that includes
milliseconds (e.g., using datetime.fromtimestamp(record.created,
tz=timezone.utc).isoformat(timespec='milliseconds') or equivalent) so timestamps
include fractional seconds while keeping UTC/Zulu formatting.
- Around line 39-43: The current get_task_logger flow can add duplicate
StreamHandler instances due to a race between the if not logger.handlers check
and logger.addHandler(handler); to fix it, guard the handler-creation and add
sequence with a lock: introduce a module-level threading.Lock and wrap the block
that checks logger.handlers, creates the StreamHandler, sets
_StructuredFormatter, and calls logger.addHandler(handler) in that lock so only
one thread can perform setup for the logger; alternatively use
logger.manager.lock (the logging.Manager-provided lock) to synchronize the same
critical section around adding handlers to ensure get_task_logger and the logger
variable are configured exactly once.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2275cab2-c4e3-412a-96b4-1109ac76533a
📒 Files selected for processing (1)
packages/ai/src/ai/modules/task/task_logger.py
| 'timestamp': time.strftime( | ||
| '%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created) | ||
| ), |
There was a problem hiding this comment.
Timestamp truncates sub-second precision
time.gmtime(record.created) silently drops the fractional-seconds portion of record.created. In practice, multiple log records emitted within the same wall-clock second will share identical timestamps, making ordering ambiguous in the structured output.
⏱️ Proposed fix to include milliseconds
- 'timestamp': time.strftime(
- '%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created)
- ),
+ 'timestamp': '{}.{:03d}Z'.format(
+ time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(record.created)),
+ int((record.created % 1) * 1000),
+ ),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| 'timestamp': time.strftime( | |
| '%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created) | |
| ), | |
| 'timestamp': '{}.{:03d}Z'.format( | |
| time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(record.created)), | |
| int((record.created % 1) * 1000), | |
| ), |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/ai/src/ai/modules/task/task_logger.py` around lines 18 - 20, The
timestamp field in task_logger.py is built with time.strftime(time_format,
time.gmtime(record.created)) which discards fractional seconds from
record.created; update the code that builds 'timestamp' (the time.strftime call
for the 'timestamp' key) to preserve sub-second precision by formatting
record.created with a function that includes milliseconds (e.g., using
datetime.fromtimestamp(record.created,
tz=timezone.utc).isoformat(timespec='milliseconds') or equivalent) so timestamps
include fractional seconds while keeping UTC/Zulu formatting.
| for key, value in record.__dict__.items(): | ||
| if key not in self._RESERVED: | ||
| payload[key] = value |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Use a dict comprehension instead of a for loop (Ruff PERF403)
♻️ Proposed refactor
- for key, value in record.__dict__.items():
- if key not in self._RESERVED:
- payload[key] = value
+ payload.update(
+ {k: v for k, v in record.__dict__.items() if k not in self._RESERVED}
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for key, value in record.__dict__.items(): | |
| if key not in self._RESERVED: | |
| payload[key] = value | |
| payload.update( | |
| {k: v for k, v in record.__dict__.items() if k not in self._RESERVED} | |
| ) |
🧰 Tools
🪛 Ruff (0.15.12)
[warning] 28-28: Use a dictionary comprehension instead of a for-loop
(PERF403)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/ai/src/ai/modules/task/task_logger.py` around lines 26 - 28, Replace
the explicit loop that builds payload from record.__dict__ with a dict
comprehension to improve performance: instead of iterating with "for key, value
in record.__dict__.items()" and testing "if key not in self._RESERVED", create a
new dict via comprehension filtering out keys in self._RESERVED and then assign
or update payload accordingly; target the code that references record.__dict__,
self._RESERVED, and payload in task_logger.py (the loop shown) and ensure
semantics remain identical (only non-reserved keys copied).
| if not logger.handlers: | ||
| handler = logging.StreamHandler() | ||
| handler.setFormatter(_StructuredFormatter()) | ||
| logger.addHandler(handler) | ||
| logger.propagate = False |
There was a problem hiding this comment.
Race condition: duplicate handlers under concurrent get_task_logger calls
The if not logger.handlers check and logger.addHandler(handler) are not atomic. Two threads can both observe an empty handler list simultaneously and each add a handler, resulting in duplicate StreamHandler instances on the same logger — every subsequent log call will then emit each line twice (or more).
The standard remedy is to use logging.Manager's existing lock via logger.manager.disable or, more practically, to rely on the fact that logging.getLogger returns the same instance and guard the setup with an explicit lock or hasHandlers() check on the manager's lock. The simplest safe pattern is:
🔒 Proposed fix using the logging module's internal lock
+import logging.handlers as _lh
def get_task_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
- if not logger.handlers:
- handler = logging.StreamHandler()
- handler.setFormatter(_StructuredFormatter())
- logger.addHandler(handler)
- logger.propagate = False
+ logging._acquireLock() # noqa: SLF001 (stdlib internal, stable)
+ try:
+ if not logger.handlers:
+ handler = logging.StreamHandler()
+ handler.setFormatter(_StructuredFormatter())
+ logger.addHandler(handler)
+ logger.propagate = False
+ finally:
+ logging._releaseLock() # noqa: SLF001
return loggerAlternatively, if get_task_logger is only ever called at module-import time (single-threaded), document that constraint explicitly with a comment or assert, and note it is not safe for concurrent calls.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/ai/src/ai/modules/task/task_logger.py` around lines 39 - 43, The
current get_task_logger flow can add duplicate StreamHandler instances due to a
race between the if not logger.handlers check and logger.addHandler(handler); to
fix it, guard the handler-creation and add sequence with a lock: introduce a
module-level threading.Lock and wrap the block that checks logger.handlers,
creates the StreamHandler, sets _StructuredFormatter, and calls
logger.addHandler(handler) in that lock so only one thread can perform setup for
the logger; alternatively use logger.manager.lock (the logging.Manager-provided
lock) to synchronize the same critical section around adding handlers to ensure
get_task_logger and the logger variable are configured exactly once.
Summary
Introduces a lightweight, thread-safe
TaskTrackerthat records thefull lifecycle of every pipeline task along with timing metadata.
Builds on the structured logging added in #745.
Problem
Logs show what happened but cannot answer live questions:
Solution
An in-process registry (
trackersingleton) that is updated at everylifecycle transition and can be queried at any time.
Changes
New File:
packages/ai/src/ai/modules/task/task_tracker.pyTaskStatestarting→initializing→running→completed/failedTaskRecordstatus,start_time,end_time,last_update,elapsed_secondsTaskTrackeron_*lifecycle hooks and query helperstrackerModified:
packages/ai/src/ai/modules/task/task_engine.py6 tracker calls added — zero existing logic changed:
start_task()tracker.on_starting()start_task()tracker.on_initializing()start_task()tracker.on_running()start_task()excepttracker.on_failed()_terminated()tracker.on_completed()_terminated()tracker.on_failed()Design Decisions
from .task_tracker import tracker— no DI neededthreading.Lock— safe for concurrent task lifecycle updatesmonotonic()timestamps — immune to system clock changesthreading,dataclasses,enum,time)task_engine.pyUsage Example
Labels
module:aifeatSummary by CodeRabbit
New Features
Documentation
Chores